/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/* ========================================================================== */
/*                                                                            */
/*   Filename.c                                                               */
/*   (c) 2001 Author                                                          */
/*                                                                            */
/*   Description                                                              */
/*                                                                            */
/* ========================================================================== */
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include "we_indextree.h"
#include "we_indexlist.h"

using namespace std;

namespace WriteEngine
{
/************************************************
 * Description:
 * Find a entry for the given rowId and Key
 * Then Delete it from the list
 * Move the rest of the row id up in the same
 * sub block an decrement the count in that subblock
 * decrement the header size
 * Converted
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 *
 * return value
 *        Success -- 0
 *        Fail    -- ERR_IDX_LIST_INVALID_DELETE
 ************************************************/
const int IndexList::deleteInSub(const RID& rowId)
{
  int rc = ERR_IDX_LIST_INVALID_DELETE;
  DataBlock prevDataBlock;
  int pos = 0, totalbytes = 0;
  IdxRidListPtr* lastIdxRidListPtr;
  int type;

  CommBlock cb;
  cb.file.oid = m_oid;
  cb.file.pFile = m_pFile;

  // get thelbid sbid and entry out from the header last entry
  // First Sub-block
  m_lbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->fbo;
  m_sbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->sbid;
  m_entry = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->entry;
  // Read the pointer entry at LIST_SUB_LLP_POS location

  IdxRidListEntry rowIdArray[MAX_BLOCK_ENTRY];
  IdxRidListEntry newRowIdArray[MAX_BLOCK_ENTRY];
  memset(rowIdArray, 0, BYTE_PER_BLOCK);
  memset(newRowIdArray, 0, BYTE_PER_BLOCK);
  // First link
  pos = LIST_SUB_LLP_POS;
  totalbytes = SUBBLOCK_TOTAL_BYTES;
  m_entryGroup = ENTRY_32;

  if (m_lbid != m_hdrLbid)
  {
    rc = readDBFile(cb, &m_curBlock, m_lbid);

    if (rc != NO_ERROR)
      return rc;

    rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, m_sbid, 0, totalbytes, rowIdArray);

    if (rc != NO_ERROR)
      return rc;

    m_curBlock.dirty = true;
    m_curBlock.lbid = m_lbid;
    m_curBlock.state = BLK_READ;
  }
  else
  {
    if (m_hdrBlock.state >= BLK_READ)
      getSubBlockEntry(m_hdrBlock.data, m_sbid, 0, totalbytes, rowIdArray);
    else
      return ERR_IDX_LIST_INVALID_DELETE;
  }

  lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
  int count;
  type = lastIdxRidListPtr->type;    // next type
  count = lastIdxRidListPtr->count;  // current count

  for (int i = 0; i < count; i++)
  {
    if (rowIdArray[i].rid == rowId)
    {
      // found
      m_dLbid = m_lbid;
      m_dSbid = m_sbid;
      m_dEntry = i;
      rc = NO_ERROR;
      memcpy(&newRowIdArray[0], &rowIdArray[0], totalbytes);
      lastIdxRidListPtr->count--;

      if (lastIdxRidListPtr->count == 0)
      {
        if (type == LIST_SIZE_TYPE)
        {
          // header has no link
          m_curIdxRidListHdr.nextIdxRidListPtr.type = LIST_NOT_USED_TYPE;
          m_curIdxRidListHdr.nextIdxRidListPtr.llp = 0;
        }
      }     // header's link block has nothing now
      else  // still have more
      {
        memcpy(&rowIdArray[i], &newRowIdArray[i + 1], (count - (i + 1)) * LIST_ENTRY_WIDTH);
      }

      // last row id entry now moved up, so not used
      rowIdArray[count - 1].type = LIST_NOT_USED_TYPE;
      rowIdArray[count - 1].rid = 0;
      rowIdArray[count - 1].spare = 0;
      // header update the size
      m_curIdxRidListHdr.idxRidListSize.size--;

      if (m_lbid != m_hdrLbid)
      {
        setSubBlockEntry(m_curBlock.data, m_sbid, 0, totalbytes, rowIdArray);
        rc = writeDBFile(cb, m_curBlock.data, m_lbid);

        if (rc != NO_ERROR)
          return rc;

        m_curBlock.state = BLK_READ;
        rc = writeSubBlockEntry(cb, &m_hdrBlock, m_hdrLbid, m_hdrSbid, m_hdrEntry, LIST_HDR_SIZE,
                                &m_curIdxRidListHdr);

        if (rc != NO_ERROR)
          return rc;

        m_hdrBlock.state = BLK_READ;
      }
      else
      {
        // m_lbid==m_hdrLbid
        setSubBlockEntry(m_hdrBlock.data, m_sbid, 0, totalbytes, rowIdArray);
        setSubBlockEntry(m_hdrBlock.data, m_hdrSbid, m_hdrEntry, LIST_HDR_SIZE, &m_curIdxRidListHdr);
        m_hdrBlock.state = BLK_WRITE;
        rc = writeDBFile(cb, m_hdrBlock.data, m_hdrLbid);

        if (rc != NO_ERROR)
          return rc;

        m_hdrBlock.state = BLK_READ;
      }  // end if m_lbid==m_hdrHdrLbid

      m_dEntry = i;
      return rc;
    }  // endif  found
  }    // end for

  return rc;
}
/************************************************
 * Description:
 * Find a entry for the given rowId and Key
 * Then Delete it from the list
 * Move the rest of the row id up in the same
 * sub block an decrement the count in that subblock
 * decrement the header size
 * Converted
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 *
 * return value
 *        Success -- 0
 *        Fail    -- ERR_IDX_LIST_INVALID_DELETE
 ************************************************/

const int IndexList::deleteInBlock(const RID& rowId)
{
  int width = LIST_ENTRY_WIDTH;
  int rc = ERR_IDX_LIST_INVALID_DELETE;
  IdxRidListPtr* lastIdxRidListPtr;
  IdxRidListPtr lastSubIdxRidListPtr;
  bool found;
  int type, count;
  IdxRidListPtr prevIdxRidListPtr;
  int prevSbid, prevEntry, prevType;
  uint64_t prevLbid;
  DataBlock prevDataBlock;
  int pos = 0, totalbytes = 0;
  int preTotalBytes, prevPos;
  // IdxRidNextListPtr *nextIdxListPtr;

  IdxRidListEntry rowIdArray[MAX_BLOCK_ENTRY];
  IdxRidListEntry newRowIdArray[MAX_BLOCK_ENTRY];

  CommBlock cb;
  cb.file.oid = m_oid;
  cb.file.pFile = m_pFile;
  // This is the sub block info
  prevLbid = m_lbid;
  prevSbid = m_sbid;
  prevEntry = m_entry;
  prevPos = LIST_SUB_LLP_POS;
  preTotalBytes = SUBBLOCK_TOTAL_BYTES;

  if (prevLbid == m_hdrLbid)
  {
    if (m_hdrBlock.state >= BLK_READ)
      getSubBlockEntry(m_hdrBlock.data, m_sbid, prevPos, LIST_ENTRY_WIDTH, &lastSubIdxRidListPtr);
    else
      return ERR_IDX_LIST_INVALID_DELETE;
  }
  else
  {
    if (m_curBlock.state >= BLK_READ)
      getSubBlockEntry(m_curBlock.data, m_sbid, prevPos, LIST_ENTRY_WIDTH, &lastSubIdxRidListPtr);
    else
      return ERR_IDX_LIST_INVALID_DELETE;
  }

  found = false;
  m_lbid = ((IdxEmptyListEntry*)&lastSubIdxRidListPtr)->fbo;
  m_sbid = 0;
  m_entry = 0;

  type = lastSubIdxRidListPtr.type;
  count = lastSubIdxRidListPtr.count;
  pos = LIST_BLOCK_LLP_POS;
  totalbytes = BYTE_PER_BLOCK;

  // Not found in the first sub
  while ((!found) && (type == LIST_BLOCK_TYPE))
  {
    rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, 0, 0, totalbytes, rowIdArray);

    if (rc != NO_ERROR)
      return rc;

    m_curBlock.dirty = true;
    m_curBlock.state = BLK_READ;
    m_curBlock.lbid = m_lbid;
    prevType = type;  // Save it just in case not found here
    lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
    type = lastIdxRidListPtr->type;
    count = lastIdxRidListPtr->count;

    // prepared for not found in current block
    // find out what is the next type
    // Next Type is needed here
    for (int i = 0; i < count; i++)
    {
      if (rowIdArray[i].rid == rowId)
      {
        // found the rowid
        memcpy(&newRowIdArray[0], &rowIdArray[0], totalbytes);
        found = true;
        m_dLbid = m_lbid;
        m_dSbid = m_sbid;
        m_dEntry = i;
        lastIdxRidListPtr->count--;

        if (lastIdxRidListPtr->count == 0)
        {
          if (!m_useNarray)
          {
            // get the previous value out, could be a sub block
            if (prevLbid == m_hdrLbid)
              getSubBlockEntry(m_hdrBlock.data, prevSbid, prevPos, LIST_ENTRY_WIDTH, &prevIdxRidListPtr);
            else if (prevLbid == m_lbid)
              getSubBlockEntry(m_curBlock.data, prevSbid, prevPos, LIST_ENTRY_WIDTH, &prevIdxRidListPtr);
            else
              rc = readSubBlockEntry(cb, &prevDataBlock, prevLbid, prevSbid, prevPos, LIST_ENTRY_WIDTH,
                                     &prevIdxRidListPtr);

            if (rc != NO_ERROR)
              return rc;

            // check the type before set
            if (type == LIST_BLOCK_TYPE)
            {
              ((IdxEmptyListEntry*)&prevIdxRidListPtr)->fbo = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;
              ((IdxEmptyListEntry*)&prevIdxRidListPtr)->sbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->sbid;
              ((IdxEmptyListEntry*)&prevIdxRidListPtr)->entry =
                  ((IdxEmptyListEntry*)lastIdxRidListPtr)->entry;
              // safety check
              prevIdxRidListPtr.type = type;
            }
            else  // If no more links, the current one is gone also
            {
              if (prevIdxRidListPtr.count > 0)
              {
                prevIdxRidListPtr.type = 0;
                prevIdxRidListPtr.llp = 0;
              }
              else
              {
                // In case it is a sub block, not released with 0 count
                prevIdxRidListPtr.type = LIST_NOT_USED_TYPE;
                prevIdxRidListPtr.llp = 0;
              }
            }  // end if type =LIST_SUBBLOCK_TYPE,LIST_BLOCK_TYPE

            //;set to LIST_NOT_USED_TYPE--unused before release
            lastIdxRidListPtr->type = LIST_NOT_USED_TYPE;
            lastIdxRidListPtr->llp = 0;

            if (prevPos == LIST_BLOCK_LLP_POS)
            {
              if (prevLbid < m_lastLbid)
                rc = setLastLbid(prevLbid);
            }
          }
        }  // end if count==0
        else
        {
          memcpy(&rowIdArray[i], &newRowIdArray[i + 1], (count - (i + 1)) * LIST_ENTRY_WIDTH);

          if (m_lastLbid > m_lbid)
            rc = setLastLbid(m_lbid);

        }  // count check

        // Found rowId
        rowIdArray[count - 1].type = LIST_NOT_USED_TYPE;
        rowIdArray[count - 1].rid = 0;

        m_curIdxRidListHdr.idxRidListSize.size--;
        // Write Out Put in another routine

        if ((prevLbid == m_hdrLbid) && (m_lbid != m_hdrLbid))
        {
          // AAC --3
          if (!m_useNarray)
          {
            if (lastIdxRidListPtr->count == 0)
            {
              setSubBlockEntry(m_hdrBlock.data, prevSbid, prevPos, width, &prevIdxRidListPtr);
            }
          }

          setSubBlockEntry(m_curBlock.data, m_sbid, 0, totalbytes, rowIdArray);
          setSubBlockEntry(m_hdrBlock.data, m_hdrSbid, m_hdrEntry, LIST_HDR_SIZE, &m_curIdxRidListHdr);
          rc = writeDBFile(cb, m_hdrBlock.data, m_hdrLbid);

          if (rc != NO_ERROR)
            return rc;

          rc = writeDBFile(cb, m_curBlock.data, m_lbid);

          if (rc != NO_ERROR)
            return rc;

          m_hdrBlock.state = BLK_READ;
          m_curBlock.state = BLK_READ;
        }
        else
        {
          // ABC --
          if (!m_useNarray)
          {
            if (lastIdxRidListPtr->count == 0)
            {
              setSubBlockEntry(prevDataBlock.data, prevSbid, prevPos, LIST_ENTRY_WIDTH, &prevIdxRidListPtr);
              rc = writeDBFile(cb, prevDataBlock.data, prevLbid);

              if (rc != NO_ERROR)
                return rc;
            }
          }

          setSubBlockEntry(m_curBlock.data, m_sbid, 0, totalbytes, rowIdArray);
          setSubBlockEntry(m_hdrBlock.data, m_hdrSbid, m_hdrEntry, LIST_HDR_SIZE, &m_curIdxRidListHdr);
          rc = writeDBFile(cb, m_hdrBlock.data, m_hdrLbid);

          if (rc != NO_ERROR)
            return rc;

          rc = writeDBFile(cb, m_curBlock.data, m_lbid);
          memset(m_hdrBlock.data, 0, sizeof(m_hdrBlock.data));

          if (rc != NO_ERROR)
            return rc;

          m_hdrBlock.state = BLK_READ;
          m_curBlock.state = BLK_READ;
        }  // last case A B C  --end 5

        // Done with writing to disk
        // Now we need to release the segment
        if (!m_useNarray)
        {
          if (lastIdxRidListPtr->count == 0)
          {
            rc = releaseSegment();

            if (rc != NO_ERROR)
              return rc;
          }  // end release segment when count ==0
        }

        m_entry = i;  // for use in findRow ID
        return rc;    // DONE !!!found then we return, no need to go on
      }               // FOUND THE ROWID returned !!!!
    }                 // for loop i not found continue to i++

    // NOT FOUND in this block go to next block
    // assigning the current llp as previous llp:lbid, sbid, entry
    prevLbid = m_lbid;
    prevSbid = 0;
    prevEntry = 0;
    prevPos = pos;
    preTotalBytes = totalbytes;

    m_lbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;
    m_sbid = 0;
    m_entry = 0;
  }  // end while

  if (!found)
    rc = ERR_IDX_LIST_INVALID_DELETE;

  return rc;
}

/************************************************
 * Description:
 * Converted
 * Find an entry for the given rowId and Key
 * Then Delete it from the list
 * Move the rest of the row id up in the same
 * sub block an decrement the count in that subblock
 * decrement the header size
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 *
 * return value
 *        Success -- 0
 *        Fail    -- ERR_IDX_LIST_INVALID_DELETE
 ************************************************/
const int IndexList::deleteIndexList(FILE* pFile, const RID& rowId, const uint64_t& key,
                                     IdxEmptyListEntry* curIdxRidListHdrPtr, uint64_t& lbid, int& sbid,
                                     int& entry)
{
  int rc = ERR_IDX_LIST_INVALID_DELETE;
  bool found = false;

  m_pFile = pFile;
  getHdrInfo(curIdxRidListHdrPtr);

  if (key != m_curIdxRidListHdr.key)
  {
    return ERR_IDX_LIST_INVALID_KEY;
  }

  uint64_t dlbid = -1LL;
  int dsbid = -1;
  int dentry = -1;
  rc = deleteIndexList(rowId, key, dlbid, dsbid, dentry);

  if (rc != NO_ERROR)
  {
    lbid = -1LL;
    sbid = -1;
    entry = -1;
    found = false;
    return rc;
  }
  else
  {
    lbid = dlbid;
    sbid = dsbid;
    entry = dentry;
  }

  return rc;
}
/************************************************
 * Description:
 * No change
 * Find a entry for the given rowId and Key
 * Then Delete it from the list
 * Move the rest of the row id up in the same
 * sub block an decrement the count in that subblock
 * decrement the header size
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 *
 * return value
 *        Success -- 0
 *        Fail    -- ERR_IDX_LIST_INVALID_DELETE
 ************************************************/
const int IndexList::deleteIndexList(const RID& rowId, const uint64_t& key, uint64_t& lbid, int& sbid,
                                     int& entry)
{
  int rc = ERR_IDX_LIST_INVALID_DELETE;
  rc = deleteIndexList(rowId, key);

  lbid = m_dLbid;
  sbid = m_dSbid;
  entry = m_dEntry;
  return rc;
}
/************************************************
 * Description:
 * Converted
 * Find all of the row Id or toke from list
 * input
 *     pFile       -- File Handler
 *     curIdxRidListHdrPtr - point to the header
 *
 * return value
 *        Success -- 0
 *        Fail    -- ERR_IDX_LIST_INVALID_DELETE
 ************************************************/
const int IndexList::getRIDArrayFromListHdr(FILE* pFile, uint64_t& key,
                                            IdxEmptyListEntry* curIdxRidListHdrPtr, RID* ridArray, int& size)
{
  int rc = NO_ERROR;
  int arrayCount = 0;
  IdxRidNextListPtr* nextIdxListPtr = NULL;

  m_pFile = pFile;
  CommBlock cb;
  cb.file.oid = m_oid;
  cb.file.pFile = m_pFile;

  rc = getHdrInfo(curIdxRidListHdrPtr);

  if (m_curIdxRidListHdr.idxRidListSize.size == 0)
  {
    size = 0;
    return NO_ERROR;
  }

  if (key != m_curIdxRidListHdr.key)
  {
    return ERR_IDX_LIST_WRONG_KEY;
  }

  // cout << "IndexList::getRIDArrayFromListHdr->KEY ------>" << key << endl;
  // Check the first row location, 3rd enty
  if (m_curIdxRidListHdr.firstIdxRidListEntry.type == (int)LIST_RID_TYPE)
  {
    ridArray[arrayCount] = (RID)m_curIdxRidListHdr.firstIdxRidListEntry.rid;
    // cout<<" IndexList::getRIDArrayFromListHdr->header Lbid->" << m_hdrLbid <<" count->" << arrayCount
    // <<endl;

    arrayCount++;
    // cout << "RID = " << (RID)m_curIdxRidListHdr.firstIdxRidListEntry.rid << endl;
  };

  // Check Header last entry's type
  int type = m_curIdxRidListHdr.nextIdxRidListPtr.type;

  switch (type)
  {
    case LIST_RID_TYPE:  // There is a row id here, Check!
      ridArray[arrayCount] = (RID)m_curIdxRidListHdr.nextIdxRidListPtr.llp;
      // cout<<"arrayCount->" << arrayCount << "rid->" << ridArray[arrayCount]<<endl;
      arrayCount++;
      // cout << "RID = " << (RID)m_curIdxRidListHdr.nextIdxRidListPtr.llp << endl;

      size = arrayCount;
      return NO_ERROR;

    case LIST_SUBBLOCK_TYPE:  // Not found in header, so go to the sub-block
      // get thelbid sbid and entry out from the header last entry

      m_lbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->fbo;
      m_sbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->sbid;
      m_entry = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->entry;
      m_curType = type;
      // Read the pointer entry at LIST_SUB_LLP_POS location
      IdxRidListPtr* lastIdxRidListPtr;
      IdxRidListEntry rowIdArray[MAX_BLOCK_ENTRY];
      memset(rowIdArray, 0, BYTE_PER_BLOCK);
      int pos = 0, totalbytes = 0;
      pos = LIST_SUB_LLP_POS;
      totalbytes = SUBBLOCK_TOTAL_BYTES;

      if (m_lbid != m_hdrLbid)
      {
        rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, m_sbid, 0, totalbytes, rowIdArray);
        m_curBlock.lbid = m_lbid;
        m_curBlock.state = BLK_READ;
        m_curBlock.dirty = true;
      }
      else
        getSubBlockEntry(m_hdrBlock.data, m_sbid, 0, totalbytes, rowIdArray);

      int type, count;

      lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
      type = lastIdxRidListPtr->type;
      count = lastIdxRidListPtr->count;

      // cout << "count->" << count << endl;
      // type should be LIST_BLOCK_TYPE from now on
      for (int i = 0; i < count; i++)
      {
        ridArray[arrayCount] = (RID)(rowIdArray[i].rid);
        // cout << "RID =" << (RID)(rowIdArray[i].rid) << endl;
        // cout<<"arrayCount->" << arrayCount << "rid->" << ridArray[arrayCount]<<endl;
        arrayCount++;
      }

      // cout << "    Lbid->" << m_lbid ;
      // cout << "    count->" << count << endl;
      m_lbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;

      while (type == LIST_BLOCK_TYPE)
      {
        // cout << "    Lbid->" << m_lbid ;

        pos = LIST_BLOCK_LLP_POS;
        totalbytes = BYTE_PER_BLOCK;
        rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, 0, 0, totalbytes, rowIdArray);
        m_curBlock.lbid = m_lbid;
        m_curBlock.state = BLK_READ;
        m_curBlock.dirty = true;

        if (!m_useNarray)
        {
          lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
          type = lastIdxRidListPtr->type;
          count = lastIdxRidListPtr->count;
        }
        else
        {
          nextIdxListPtr = (IdxRidNextListPtr*)&rowIdArray[pos];
          type = nextIdxListPtr->type;
          count = nextIdxListPtr->count;
        }

        // cout << "    count->" << count << endl;
        for (int i = 0; i < count; i++)
        {
          ridArray[arrayCount] = (RID)(rowIdArray[i].rid);
          // cout << "RID =" << (RID)(rowIdArray[i].rid) << endl;
          // cout<<"arrayCount->" << arrayCount << "rid->" << ridArray[arrayCount]<<endl;
          arrayCount++;
        }

        if (type == LIST_BLOCK_TYPE)
        {
          if (m_useNarray)
            m_lbid = nextIdxListPtr->nextLbid;
          else
            m_lbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;
        }

      }  // end while
  };     // end of switch

  size = arrayCount;

  return rc;
}  // end getRIDArrayFromListHdr

const int IndexList::getRIDArrayFromListHdrNarray(FILE* pFile, uint64_t& key,
                                                  IdxEmptyListEntry* curIdxRidListHdrPtr, RID* ridArray,
                                                  int& size, bool flag)
{
  int rc = NO_ERROR;
  IdxRidNextListPtr* nextIdxListPtr;
  int pos = 0, totalbytes = 0;
  IdxRidListPtr* lastIdxRidListPtr;
  IdxRidListEntry rowIdArray[MAX_BLOCK_ENTRY * 10];
  int type = 0, count = 0;

  m_pFile = pFile;
  CommBlock cb;
  cb.file.oid = m_oid;
  cb.file.pFile = m_pFile;

  if (flag)
  {
    rc = getHdrInfo(curIdxRidListHdrPtr);

    if (m_curIdxRidListHdr.idxRidListSize.size == 0)
    {
      size = 0;
      return NO_ERROR;
    }

    if (key != m_curIdxRidListHdr.key)
    {
      return ERR_IDX_LIST_WRONG_KEY;
    }

    // cout << "IndexList::getRIDArrayFromListHdr->KEY ------>" << key << endl;
    // Check the first row location, 3rd enty
    if (m_curIdxRidListHdr.firstIdxRidListEntry.type == (int)LIST_RID_TYPE)
    {
      ridArray[size] = (RID)m_curIdxRidListHdr.firstIdxRidListEntry.rid;
      // cout<<" IndexList::getRIDArrayFromListHdr->header Lbid->" << m_hdrLbid <<" count->" << arrayCount
      // <<endl;

      size++;
      // cout << "RID = " << (RID)m_curIdxRidListHdr.firstIdxRidListEntry.rid << endl;
    };

    // Check Header last entry's type
    int type = m_curIdxRidListHdr.nextIdxRidListPtr.type;

    switch (type)
    {
      case LIST_RID_TYPE:  // There is a row id here, Check!
        ridArray[size] = (RID)m_curIdxRidListHdr.nextIdxRidListPtr.llp;
        // cout<<"arrayCount->" << arrayCount << "rid->" << ridArray[arrayCount]<<endl;
        size++;
        // cout << "RID = " << (RID)m_curIdxRidListHdr.nextIdxRidListPtr.llp << endl;
        return NO_ERROR;

      case LIST_SUBBLOCK_TYPE:  // Not found in header, so go to the sub-block
        // get thelbid sbid and entry out from the header last entry

        m_lbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->fbo;
        m_sbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->sbid;
        m_entry = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->entry;
        m_curType = type;
        // Read the pointer entry at LIST_SUB_LLP_POS location

        memset(rowIdArray, 0, BYTE_PER_BLOCK);

        pos = LIST_SUB_LLP_POS;
        totalbytes = SUBBLOCK_TOTAL_BYTES;

        if (m_lbid != m_hdrLbid)
        {
          rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, m_sbid, 0, totalbytes, rowIdArray);
          m_curBlock.lbid = m_lbid;
          m_curBlock.state = BLK_READ;
          m_curBlock.dirty = true;
        }
        else
          getSubBlockEntry(m_hdrBlock.data, m_sbid, 0, totalbytes, rowIdArray);

        lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
        type = lastIdxRidListPtr->type;
        count = lastIdxRidListPtr->count;

        // cout << "count->" << count << endl;
        // type should be LIST_BLOCK_TYPE from now on
        for (int i = 0; i < count; i++)
        {
          ridArray[size] = (RID)(rowIdArray[i].rid);
          // cout << "RID =" << (RID)(rowIdArray[i].rid) << endl;
          // cout<<"arrayCount->" << arrayCount << "rid->" << ridArray[arrayCount]<<endl;
          size++;
        }

        // cout << "    Lbid->" << m_lbid ;
        // cout << "    count->" << count << endl;
        m_lbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;
        m_curType = type;
    }  // end of switch
  }    // end if flag

  if (m_curType == LIST_BLOCK_TYPE)
  {
    pos = LIST_BLOCK_LLP_POS;
    totalbytes = BYTE_PER_BLOCK;
    rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, 0, 0, totalbytes, rowIdArray);
    m_curBlock.lbid = m_lbid;
    m_curBlock.state = BLK_READ;
    m_curBlock.dirty = true;

    nextIdxListPtr = (IdxRidNextListPtr*)&rowIdArray[pos];
    type = nextIdxListPtr->type;

    count = nextIdxListPtr->count;

    for (int i = 0; i < count; i++)
    {
      ridArray[size] = (RID)(rowIdArray[i].rid);
      size++;
    }

    IdxRidListArrayPtr idxRidListArrayPtr;
    int curLevel = 0, curCount = 0;

    memset(&idxRidListArrayPtr, 0, sizeof(idxRidListArrayPtr));
    getSubBlockEntry(m_curBlock.data, 0, BEGIN_LIST_BLK_LLP_POS, LIST_BLK_LLP_ENTRY_WIDTH,
                     &idxRidListArrayPtr);
    curLevel = idxRidListArrayPtr.nextIdxListPtr.curLevel;
    curCount = idxRidListArrayPtr.nextIdxListPtr.count;

    for (int i = 0; i < TOTAL_NUM_ARRAY_PTR; i++)
    {
      m_lbid = idxRidListArrayPtr.childIdxRidListPtr[i].childLbid;
      int type = idxRidListArrayPtr.childIdxRidListPtr[i].type;

      if ((m_lbid != (uint64_t)INVALID_LBID) && (type == LIST_BLOCK_TYPE))
      {
        m_curType = LIST_BLOCK_TYPE;
        getRIDArrayFromListHdrNarray(pFile, key, curIdxRidListHdrPtr, ridArray, size, false);
      }
    }

  }  // end if block

  return rc;
}  // end getRIDArrayFromListHdrNarray

/*---------------------------------------------------------------------
 * Description:  Output the info of this company to an abstract stream
 *
 * Input:        None
 *
 * Output:       None
 *
 * Comments:     None
 *
 * Return Code:  the abstract output stream
 *------------------------------------------------------------------*/
ostream& operator<<(ostream& os, IndexList& rhs)
{
  os << rhs.m_curIdxRidListHdr.idxRidListSize.size << "\t" << rhs.m_curIdxRidListHdr.key << "\t"
     << rhs.m_curIdxRidListHdr.firstIdxRidListEntry.type << "\t"
     << rhs.m_curIdxRidListHdr.firstIdxRidListEntry.rid << "\t"
     << rhs.m_curIdxRidListHdr.nextIdxRidListPtr.type << "\t" << rhs.m_curIdxRidListHdr.nextIdxRidListPtr.llp
     << "\t" << endl;
  return os;
}
/************************************************
 * Description:
 * Find a entry for the given rowId and Key
 * Converted
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 * output
 *     lbid       -- File block id
 *     sbid      -- Sub Block id
 *     entry     -- Entry id
 *
 *
 * return value
 *        true --found
 *        false--not found
 ************************************************/
bool IndexList::findRowId(FILE* pFile, const RID& rowId, const uint64_t& key,
                          IdxEmptyListEntry* curIdxRidListHdrPtr, uint64_t& lbid, int& sbid, int& entry)
{
  bool found = false;
  int rc;

  m_pFile = pFile;
  rc = getHdrInfo(curIdxRidListHdrPtr);

  if (key != m_curIdxRidListHdr.key)
  {
    return false;
  }

  found = findRowId(rowId, key, lbid, sbid, entry);
  return found;
}

/************************************************
 * Description:
 * Find a entry for the given rowId and Key
 * Converted
 * input
 *     pFile       -- File Handler
 *     rowId       -- row id
 *     key         -- value
 *     curIdxRidListHdrPtr - point to the header
 * output --
 *     lbid       -- File block id
 *     sbid      -- Sub Block id
 *     entry     -- Entry id
 *
 * return value
 *        true --found
 *        false--not found
 ************************************************/
bool IndexList::findRowId(const RID& rowId, const int64_t& key, int64_t& lbid, int& sbid, int& entry)
{
  bool found = false;
  int rc;
  RID savedRid;
  CommBlock cb;
  int count;
  uint64_t prevLbid;
  int prevType;

  cb.file.oid = m_oid;
  cb.file.pFile = m_pFile;
  // Check the first row location, 3rd enty--0,1,2

  if (m_curIdxRidListHdr.firstIdxRidListEntry.type == (int)LIST_RID_TYPE)
  {
    if (m_curIdxRidListHdr.firstIdxRidListEntry.rid == rowId)
    {
      lbid = m_hdrLbid;
      sbid = m_hdrSbid;
      entry = m_hdrEntry + 2;
      found = true;
      return found;
    }
  };  // endif type=LIST_RID_TYPE

  // Check Header last entry's type
  int type = m_curIdxRidListHdr.nextIdxRidListPtr.type;

  int pos = 0, totalbytes = 0;

  switch (type)
  {
    case LIST_NOT_USED_TYPE:  // Header is not full, no sub-block linked
      // No RowId here then on
      lbid = -1LL;
      sbid = -1;
      entry = -1;
      found = false;
      return found;  // not found

    case LIST_RID_TYPE:  // There is a row id here, Check!

      savedRid = (RID)m_curIdxRidListHdr.nextIdxRidListPtr.llp;

      if (savedRid == rowId)
      {
        lbid = m_hdrLbid;
        sbid = m_hdrSbid;
        entry = m_hdrEntry + 3;
        found = true;
        return found;
      }
      else
      {
        lbid = -1LL;
        sbid = -1;
        entry = -1;
        found = false;
        return found;
      }

    case LIST_SUBBLOCK_TYPE:  // Not found in header
      // get the lbid sbid and entry out from the header last entry

      m_lbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->fbo;
      m_sbid = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->sbid;
      m_entry = ((IdxEmptyListEntry*)&(m_curIdxRidListHdr.nextIdxRidListPtr))->entry;

      // Read the pointer entry at LIST_SUB_LLP_POS
      // reserve enough space for rowIdArray
      IdxRidListPtr* lastIdxRidListPtr;
      IdxRidListEntry rowIdArray[MAX_BLOCK_ENTRY];
      memset(rowIdArray, 0, BYTE_PER_BLOCK);
      // first link
      pos = LIST_SUB_LLP_POS;
      totalbytes = SUBBLOCK_TOTAL_BYTES;

      // check if the sub block is on the header block
      if (m_lbid != m_hdrLbid)
      {
        rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, m_sbid, 0, totalbytes, rowIdArray);
        m_curBlock.dirty = true;
        m_curBlock.state = BLK_READ;
      }
      else
      {
        getSubBlockEntry(m_hdrBlock.data, m_sbid, 0, totalbytes, rowIdArray);
      }

      lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];

      prevLbid = m_lbid;                 // sub block
      prevType = type;                   // sub block
      count = lastIdxRidListPtr->count;  // current count
      type = lastIdxRidListPtr->type;    // block
      found = false;

      // look inside the first sub block
      for (int i = 0; i < count; i++)
      {
        if (rowIdArray[i].rid == rowId)
        {
          found = true;
          lbid = m_lbid;
          sbid = m_sbid;
          entry = i;
          return found;
        }
      }

      while ((!found) && (type == LIST_BLOCK_TYPE))
      {
        // There are more to check on the next link
        m_lbid = ((IdxEmptyListEntry*)lastIdxRidListPtr)->fbo;
        m_sbid = 0;
        m_entry = 0;

        pos = LIST_BLOCK_LLP_POS;
        totalbytes = BYTE_PER_BLOCK;

        if ((m_lbid != m_hdrLbid) && (m_lbid != prevLbid))
        {
          // the only case for block
          rc = readSubBlockEntry(cb, &m_curBlock, m_lbid, m_sbid, 0, totalbytes, rowIdArray);
          m_curBlock.dirty = true;
        }
        else
        {
          printf("error in findRowID\n");
          return false;
        }

        prevType = type;
        lastIdxRidListPtr = (IdxRidListPtr*)&rowIdArray[pos];
        type = lastIdxRidListPtr->type;
        count = lastIdxRidListPtr->count;
        found = false;

        for (int i = 0; i < count; i++)
        {
          if (rowIdArray[i].rid == rowId)
          {
            found = true;
            lbid = m_lbid;
            sbid = m_sbid;
            entry = i;
            return found;
          }
        }  // end for i

        prevLbid = m_lbid;
      }  // end while

      break;

    default: printf("FIND ROWID got no where, error out !! \n"); break;
  };  // end switch

  lbid = INVALID_LBID;

  sbid = INVALID_NUM;

  entry = INVALID_NUM;

  found = false;

  return found;
}  // end function

}  // namespace WriteEngine
